function f_kill_sql_for_mysql()
{ #大查询执行太久的话(10s超时), 就会触发kill掉的操作
    declare _table_name _db_name _concat_str
    _db_name="${1}"
    _table_name="${2}"
    _concat_str="${3}"
    kill_id_list_sql="select ${_concat_str} from information_schema.processlist where user = '${mysql_user}' and db = '${_db_name}'
        and (host like '${localhost_ip}:%' or host = '${localhost_ip}')
        and info like 'SELECT %${sql_md5_str}:%AS crc FROM (%${_db_name}._${_table_name}_%';"
    kill_id_list="$(timeout 10s ${mysql_comm} -NBe "${kill_id_list_sql}" 2>/dev/null|timeout 10s ${mysql_comm} -NB 2>/dev/null)"
}

function f_get_pk_list()
{
    #如果没有主键, 有唯一键且非空, 也是可以的
    declare tmp tmp1 tmp2 _sql _table _db _pri_name pri_name

    tmp="true"

    _db="${1}"
    _table="${2}"
    _sql="select COLUMN_NAME from information_schema.columns where table_name = '${_table}' and table_schema = '${_db}' and COLUMN_KEY = 'PRI';"
    pri_name="$(${mysql_comm} -NBe "show create table \`${_table}\`\G" 2>/dev/null|grep -P "^[ ]*PRIMARY KEY "|sed 's/`//g')" || true
    if [ -n "${pri_name}" ]
    then
        global_pri_name="$(sed 's/.*(\(.*\)).*/\1/g' <<< "${pri_name}")"
        return 0
    fi

    _pri_name=($(${mysql_comm} -NBe "${_sql}" 2>/dev/null)) || true
    if [ "${#pri_name[@]}x" == "0x" ]
    then
        pri_name=()
        return 0
    fi

    pri_name=($(${mysql_comm} -NBe "show create table \`${_table}\`\G" 2>/dev/null|grep -P "^[ ]*UNIQUE KEY "|sed 's/`//g'|sed 's/.*(\(.*\)).*/\1,/g')) || true

    for tmp1 in ${pri_name[@]}
    do #遍历所有唯一索引组
        for tmp2 in ${_pri_name[@]}
        do #将pri字段匹配唯一索引组, 如果所有主键的单个字段都在唯一索引组里面就表示这个唯一索引组就是主键
            grep -c "${tmp2}," <<< "${tmp1}" >/dev/null || tmp="false"
        done

        [ "${tmp}x" == "truex" ] && global_pri_name="$(sed 's/,$//g' <<< "${tmp1}")" && break

    done

    [ "${tmp}x" != "truex" ] && global_pri_name=""

    return 0
}

function f_get_sql_for_where()
{
    #拼接where条件, 主要是针对联合主键
    declare _pk _pk_file
    _pk=($(tr "," " " <<< "${1}")) #主键列表
    _pk_file="${2}"                #保存主键的临时文件

    declare -a _sql
    _sql[0]="where 1 = 1"
    _sql[1]="${_pk[0]} = $(tail -1 ${_pk_file}|awk -F'\t' '{print $1}')"
    _sql[2]="${_pk[0]} > $(tail -1 ${_pk_file}|awk -F'\t' '{print $1}')"
    _sql[3]="${_pk[1]} = $(tail -1 ${_pk_file}|awk -F'\t' '{print $2}')"
    _sql[4]="${_pk[1]} > $(tail -1 ${_pk_file}|awk -F'\t' '{print $2}')"
    _sql[5]="${_pk[2]} > $(tail -1 ${_pk_file}|awk -F'\t' '{print $3}')"

    if [ "${#_pk[@]}" -gt 2 ]
    then #三个字段的联合主键

        _sql[6]="(${_sql[1]} and ${_sql[3]} and ${_sql[5]}) or (${_sql[1]} and ${_sql[4]}) or (${_sql[2]})"
        _sql[0]="${_sql[0]} and (${_sql[6]})"

    elif [ "${#_pk[@]}" -gt 1 ]
    then #两个字段的联合主键

        _sql[6]="(${_sql[1]} and ${_sql[4]}) or (${_sql[2]})"
        _sql[0]="${_sql[0]} and (${_sql[6]})"

    elif [ "${#_pk[@]}" -gt 0 ]
    then #两个字段的联合主键
        _sql[0]="${_sql[0]} and ${_pk[0]} > $(tail -1 ${_pk_file})"
    fi

    global_where="${_sql[0]}"

    return 0
}

function f_check_time()
{
    declare check_mark s_time e_time
    check_mark=1
    for _check_time in $(tr "," " " <<< "${check_time}")
    do
        s_time="$(awk -F'-' '{print $1}' <<< "${_check_time}")"
        if [ "$(grep -c -- "-" <<< "${_check_time}")x" == "1x" ]
        then
            e_time="$(awk -F'-' '{print $2}' <<< "${_check_time}")"
        else
            e_time="${s_time}"
        fi

        if [ "$(date +%H)" -ge "${s_time}" ] && [ "$(date +%H)" -le "${e_time}" ]
        then
            check_mark="0"
            break
        fi
    done

    return ${check_mark}
}

function f_check_continue()
{ #网络判断, 如果启用了网络检测脚本, 当网络达到阈值的时候就会生成pause文件, 然后就会sleep直到网络恢复
    while :
    do
        [ -f "${stop_file}" ] && break
        [ -z "${check_time}" ] && break
        [ "${his_time}x" == "$(date +%H)x" ] && break #|| his_time="$(date +%M)"

        if f_check_time
        then
            his_time="$(date +%H)"
            break
        elif [ "$(($(date +%s)%900))" -eq 0 ]
        then
            f_logging "$(eval echo ${log_addr}):WARN" "因执行时间 '$(date +%H)' not in '${check_time}' 不满足, 校验任务已经暂停......"
        fi

        sleep 1
    done

    [ ! -f "${pause_file}" ] && return 0
    while :
    do
        [ -f "${stop_file}" ] && break
        [ ! -f "${pause_file}" ] && break
        sleep 1

        if [ "$(($(date +%s)%900))" -eq 0 ]
        then
            f_logging "$(eval echo ${log_addr}):WARN" "校验任务已经暂停......"
        fi
    done
}

function f_get_check_sql()
{   #拼接检查sql, 最终会返回一条完整的sql, 作为单次校验的逻辑sql
    declare _table_name _db_name _where_str col_str col_str_isnull
    _table_name="${1}"
    _db_name="${2}"
    _where_str="${3}"
    #拼接成对比的sql
    col_str="$(${mysql_comm} -NBe "desc ${_db_name}.\`${_table_name}\`;" 2>/dev/null|awk '{print "`"$1"`"}'|tr "\n" ",")"
    col_str_isnull="$(${mysql_comm} -NBe "desc ${_db_name}.\`${_table_name}\`;" 2>/dev/null|awk '{print "ISNULL(`"$1"`)"}'|tr "\n" ","|sed 's#\(.*\),$#\1#g')"

    _sql="set tx_isolation='REPEATABLE-READ';set innodb_lock_wait_timeout=1"
    _sql="${_sql};SELECT '${sql_md5_str}:${_table_name}', COUNT(*) AS cnt, COALESCE(LOWER(CONV(BIT_XOR(CAST(CRC32(CONCAT_WS('#',${col_str}"
    _sql="${_sql}CONCAT(${col_str_isnull}))) AS UNSIGNED)), 10, 16)), 0) AS crc FROM"
    check_sql="${_sql} (select * from ${_db_name}.\`${_table_name}\` ${_where_str} )a;"
    echo "${check_sql}"
}

function f_check_diff_for_mysql()
{
    db_name="${1}"
    mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[0]} -P${mysql_port1} ${db_name} -A"

    declare -a table

    if [ -z "${check_table}" ]
    then #处理忽略检查逻辑, 根据条件忽略哪些表
        __="select table_name from information_schema.tables where table_schema = '${db_name}' order by table_rows desc;"
        table_tmp=($(${mysql_comm} -NBe "${__}" 2>/dev/null)) || true
        for i in ${table_tmp[@]}
        do
            is_skip="$(tr "," "\n" <<< "${skip_check_table}"|grep -Pic "(^${i}$)|(^${db_name}\.${i}$)")"
            if [ "${is_skip}x" == "0x" ]
            then
                table+=(${i})
            fi
        done
    else
        table=($(echo ${check_table}|tr "," "\n"|grep -Pi "(^${db_name}\.)"|sed "s/^${db_name}\.//g"))
        table=($(echo ${check_table}|tr "," "\n"|grep -v "\.") ${table[@]})
    fi

    error_res_log="${log_dir}/error_exe_mysql"
    error_log_table_dir="${res_table_dir}/${db_name}"
    mkdir -p ${pri_file_dir}/${db_name}
    mkdir -p ${error_log_table_dir}

    for ((i=0;i<${#table[@]};i++))
    do
        [ -f "${stop_file}" ] && break
        f_check_continue
        [ ${i} -gt ${THREAD_NUM} ] && ii="$((${i}%${THREAD_NUM}))" || ii="${i}"
        table_check_stat="$(grep -c "^${db_name}\.${table[${i}]}$" ${table_list_file})" || true         #检查完成会放在这个文件
        table_check_stat_ing="$(grep -c "^${db_name}\.${table[${i}]}$" ${table_list_file_ing})" || true #正在检查的会放在这个文件

        if [ "${table_check_stat}x" != "0x" ]
        then
            f_logging "$(eval echo ${log_addr}):WARN" "THREAD ${ii} : [ table : ${table[${i}]} ] 该表已经检查完毕"
            continue  #执行完就往3里面写一行，这样就实现并行了
        elif [ "${table_check_stat_ing}x" != "0x" ]
        then
            f_logging "$(eval echo ${log_addr}):WARN" "THREAD ${ii} : [ table : ${table[${i}]} ] 已经有任务在检查该表了"
            continue  #执行完就往3里面写一行，这样就实现并行了
        fi

        read -u 3  #读取3里面的一行，读一行就少一行
        {
            sleep ${ii}
            start_time="$(date +%s)"
            echo "${db_name}.${table[${i}]}" >> ${table_list_file_ing} #记录一下正在检查的表

            mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[1]} -P${mysql_port2} ${db_name} -A"

            #判断表结构是否一致
            #记录host2的该表的表结构, 后面会判断跟host1是否一致
            table_structure="$(f_get_check_sql "columns" "information_schema" "where table_name = '${table[${i}]}' and table_schema = '${db_name}' order by COLUMN_NAME")"
            host2_table_structure="$(${mysql_comm} -NBe "${table_structure}" 2>/dev/null|md5sum|awk '{print $1}')"

            #判断host2是否有这个百奥
            host2_is_has_table="select count(*) from information_schema.tables where table_name = '${table[${i}]}' and table_schema = '${db_name}';"
            host2_is_has_table="$(${mysql_comm} -NBe "${is_has_table}" 2>/dev/null)"

            #记录host1的该表的表结构, 结合前面的host2的表结构判断是否一致
            mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[0]} -P${mysql_port1} ${db_name} -A"
            host1_table_structure="$(${mysql_comm} -NBe "${table_structure}" 2>/dev/null|md5sum|awk '{print $1}')"

            thread_info="${db_name}.${table[${i}]} ] ["                     #日志的格式

            if [ "${skip_check_structure}x" == "truex" ]
            then
                host1_table_structure="${host2_table_structure}"
            fi

            if [ "${host1_table_structure}x" != "${host2_table_structure}x" ] || [ "${host1_table_structure}x" == "d41d8cd98f00b204e9800998ecf8427ex" ]
            then #两个md5不一致就是表示表结构不一致,  d41d8cd98f00b204e9800998ecf8427e 表示没有结果
                table_structure_tmp="$(awk -F' FROM ' '{print "select * from"$2}' <<< "${table_structure}")"
                mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[1]} -P${mysql_port2} ${db_name} -A"
                tmp_file1="${log_dir}/.${db_name}.${table[${i}]}.1.tmp.log"
                tmp_file2="${log_dir}/.${db_name}.${table[${i}]}.2.tmp.log"
                ${mysql_comm} -NBe "${table_structure_tmp}" > ${tmp_file1} 2>/dev/null
                mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[0]} -P${mysql_port1} ${db_name} -A"
                ${mysql_comm} -NBe "${table_structure_tmp}" > ${tmp_file2} 2>/dev/null
                diff_col=($(diff ${tmp_file1} ${tmp_file2}|grep "<"|awk '{print $5}'))
                f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} 表结构不一致 ] [ ${diff_col[*]} ] [ 跳过该表的检查" "2"
                echo "${db_name}.${table[${i}]}:table structure err" >> ${skip_file}
                [ -f "${tmp_file1}" ] && rm -f "${tmp_file1}"
                [ -f "${tmp_file2}" ] && rm -f "${tmp_file2}"

                echo 1>&3 && continue  #执行完就往3里面写一行，这样就实现并行了
            else
                f_logging "$(eval echo ${log_addr}):INFO" "${thread_info} 表结构一致" "2"
            fi

            table_row_sql="select TABLE_ROWS from information_schema.tables where table_name = '${table[${i}]}' and table_schema = '${db_name}';"
            table_row="$(${mysql_comm} -NBe "${table_row_sql}" 2>/dev/null)" #粗鲁获取表的总行数, 用来判断检查进度

            [ "${table_row}" -lt 100 ] && table_row="$(${mysql_comm} -NBe "select count(*) from ${db_name}.\`${table[${i}]}\`" 2>/dev/null)"

            #查找当前表的主键字段名列表
            f_get_pk_list "${db_name}" "${table[${i}]}" #返回global_pri_name

            pri_name=($(tr "," " " <<< "${global_pri_name}"))

            if [ "${#pri_name[@]}x" == "0x" ]
            then #没有主键就跳过检查
                f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} 没有主键, 跳过该表的检查" "2"
                echo "${db_name}.${table[${i}]}:no pk" >> ${skip_file}
                echo 1>&3 && continue  #执行完就往3里面写一行，这样就实现并行了
            elif [ "${host2_is_has_table}x" == "0x" ]
            then
                f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} ${host_list[1]}:${mysql_port2} 没有该表, 跳过该表的检查" "2"
                echo "${db_name}.${table[${i}]}:${host_list[1]}:${mysql_port2} no table" >> ${skip_file}
                echo 1>&3 && continue  #执行完就往3里面写一行，这样就实现并行了
            fi

            declare -A pri_type #主键字段的数据类型字典

            for tmp in ${pri_name[@]}
            do #将各个主键字段的数据类型记录到字典, 后文会判断主键字段的类型是否是字符型, 如果是字符型就需要特殊处理(带引号)
                __="select DATA_TYPE from information_schema.columns where table_schema = '${db_name}' and table_name = '${table[${i}]}'
                    and COLUMN_KEY = 'PRI' and COLUMN_NAME = '${tmp}'"
                pri_type["${tmp}"]="$(${mysql_comm} -NBe "${__}" 2>/dev/null)"
            done

            pri_count="${#pri_name[@]}"                                     #主键个数, 是否是联合主键

            if [ ${pri_count} -gt 3 ]
            then #联合主键至多支持三个字段, 再多就忽视, 或者自行修改逻辑
                f_logging "$(eval echo ${log_addr}):WARN" "${thread_info}联合主键[ ${pri_name[*]} ] 字段太多, 跳过该表检查" "2"
                echo "${db_name}.${table[${i}]}:pk col to long" >> ${skip_file}
                echo 1>&3 && continue  #执行完就往3里面写一行，这样就实现并行了
            fi

            tmp_file="${pri_file_dir}/${db_name}/pri_${table[${i}]}.log"     #临时文件, 每次从库里将数据取出来存放这个文件, 作为下次取数的判断条件

            error_log_table_file="${error_log_table_dir}/${table[${i}]}.log" #执行失败存放临时文件

            [ -f "${error_log_table_file}" ] && mv "${error_log_table_file}" "${error_log_table_file}.$(date +%s)"

            select_str=""

            [ "${pri_name}x" == "NULLx" ] && pri_count=0

            for tmp in ${pri_name[@]}
            do #这步主要是解决将主键弄清楚, 到底是单列主键还是多列, 到底是整型还是其他, 然后根据不同的类型拼接成一个字符串, 主要是用来作为后面取数是否要加单引号
               #因为整型取出来不用做特殊处理, 但是非整型需要加上单引号, 要不然作为where条件对比的时候肯定有问题
                if [ "$(grep -ic "int" <<< "${pri_type["${tmp}"]}")x" != "1x" ]
                then
                    select_str_tmp="concat(\"'\",${tmp},\"'\")"
                    pk_style="str"
                else
                    select_str_tmp="${tmp}"
                fi

                if [ -z "${select_str}" ]
                then
                    select_str="${select_str_tmp}"
                else
                    select_str="${select_str},${select_str_tmp}"
                fi

            done

            pri_name_tmp="$(echo "${pri_name[@]}"|tr " " ",")"
            pri_name="(${pri_name_tmp})"
            order_by_str="order by ${pri_name_tmp}"                                      #tidb是分布式, 需要通过order by保证数据的顺序

            _select_pk="${pri_name_tmp}"

            start_count=0
            end_count=0
            error_count=0       #当遇到数据不一致的时候, 重试10次, 有任何一次一致就算一致
            md5sum_ok_count=0   #保存md5值相同的次数, 如果检查到数据不一致的时候(可能是因为这部分数据是热点数据, 正在更新), 会进行再次检查
                                #如果之前的md5和现在的md5是一致的, 如果连续3次, 就认为是数据不一致

            error_mark="false"  #当遇到不一致就置为true

            #遍历整个表的数据, 第一次是先获取最小的一行, 然后将这行数据的主键记录到临时文件
            #然后遍历的时候是读取这个文件将上次取数的最后一个主键拿出来作为本次取数的第一行(其实是大于开区间)
            while :
            do
                [ -f "${stop_file}" ] && break
                f_check_continue
                if [ -z "${_min_rowid}" ]
                then #拿出当前表的最小行id, 每次拿数据的起始行
                    _min_rowid="select ${select_str} from \`${table[${i}]}\` ${order_by_str} limit 1;"
                    _min_rowid="$(${mysql_comm} -NBe "${_min_rowid}" 2>/dev/null)"
                    global_where="$(echo "where ${pri_name} in (($(tr "\t" "," <<< "${_min_rowid}")))")"

                elif [ "${error_mark}x" == "falsex" ] || [ "${error_count}" -gt 8 ] || [ "${md5sum_ok_count}" -gt 3 ]
                then
                    f_get_sql_for_where "${pri_name_tmp}" "${tmp_file}"
                    error_count=0       #当遇到数据不一致的时候, 重试10次, 有任何一次一致就算一致 , 这里可以理解为是初始化
                    md5sum_ok_count=0
                    error_mark="false"  #当遇到不一致就置为true
                else
                    error_count="$((${error_count}+1))"
                fi

                if [ -z "${_min_rowid}" ]
                then
                    f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} 是空表, 跳过该表检查" "2"
                    echo "${db_name}.${table[${i}]}" >> ${table_list_file}
                    break
                fi

                if [ "${pri_count}x" != "0x" ]
                then #这步是拼接取数sql
                    #每次将结果集的行数拿出来, 作为日志显示的步长
                    sql_str_tmp="select ${select_str} from \`${table[${i}]}\` ${global_where} ${order_by_str} limit ${max_count};"
                    sql_str_tmp_count="select count(*) from (select * from \`${table[${i}]}\` ${global_where} ${order_by_str} limit ${max_count})a;"
                    exe_state=0
                    error_res_log_file="${error_res_log}.THREAD.${table[${i}]}.log" #错误记录临时文件

                    echo "${sql_str_tmp}"|${mysql_comm} -NB 2>/dev/null|tail -1 > ${tmp_file} 2>${error_res_log_file} || exe_state="${?}" #取数

                    max_count_tmp="$(${mysql_comm} -NBe "${sql_str_tmp_count}" 2>/dev/null)" || true

                    _min_rowid="$(tail -1 ${tmp_file})" #每次都取本次最后一行作为下一次的起始行

                    if [ -f "${error_res_log_file}" ]
                    then
                        err_info="$(sed 's/.*Using a password.*//g' ${error_res_log_file}|grep -P "\w")" || true
                        rm -f "${error_res_log_file}"
                    fi

                    if [ ${exe_state} -ne 0 ]
                    then
                        f_logging "$(eval echo ${log_addr}):ERROR" "${thread_info} ${sql_str_tmp} ] [ SQL语句语法错误 ] [ ERR : ${err_info} ] "
                        echo "${db_name}.${table[${i}]}:syntax error" >> ${skip_file}
                        unset err_info
                        break  #执行完就往3里面写一行，这样就实现并行了
                    elif [ ! -s "${tmp_file}" ]
                    then
                        sleep 0.1
                        break
                    fi

                fi

                [ -f "${stop_file}" ] && break

                f_check_continue

                where_str="${global_where} ${order_by_str} limit ${max_count}"

                _sql="$(f_get_check_sql "${table[${i}]}" "${db_name}" "${where_str}")" #获取检查的sql

                md5sum_file1="${md5_dir}/${host_list[0]}_${mysql_port1}_${db_name}_${table[${i}]}.log"
                md5sum_file2="${md5_dir}/${host_list[1]}_${mysql_port2}_${db_name}_${table[${i}]}.log"
                touch ${md5sum_file1} ${md5sum_file2}
                for ((j=0;j<${#host_list[@]};j++))
                do #向host1 host2同时去数, 判断md5是否一致(不是同时, 但是可以理解为是同时)
                {
                    if [ "${j}x" == "0x" ]
                    then
                        mysql_port="${mysql_port1}"
                    else
                        mysql_port="${mysql_port2}"
                    fi
                    mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[${j}]} -P${mysql_port} ${db_name} -A"
                    tmp_res_file="${md5_dir}/${host_list[${j}]}_${mysql_port}_${db_name}_${table[${i}]}.log"
                    for ((t=0;t<=30;t++))
                    do
                        [ -f "${stop_file}" ] && break
                        f_check_continue
                        exe_state=0
                        { echo "${_sql}"|timeout 20s ${mysql_comm} -NB 2>/dev/null|awk '{print $1":"$2":"$3}' > ${tmp_res_file};} || exe_state="${?}"
                        if [ -s "${tmp_res_file}" ]
                        then
                            break
                        elif [ ${exe_state} -eq 124 ]
                        then
                            f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} ${host_list[${j}]}:${mysql_port}:${_sql} ] [ 查询超时, 5秒后将重试"
                            sleep 5
                        fi

                        if [ ${t} -gt 20 ]
                        then
                            __="$(echo "${_sql}"|timeout 10s ${mysql_comm} -NB 2>&1|awk '{print $1":"$2":"$3}'|grep -v "Using a password")" || true
                            __="执行失败, 失败原因 : ${__}"
                            f_logging "$(eval echo ${log_addr}):ERROR" "${thread_info} ${host_list[${j}]}:${_sql} ] [ ${__}"
                            echo "${db_name}.${table[${i}]}:error" >> ${skip_file}
                            break
                        fi

                        is_tidb="$(grep -c -- "-TiDB-" <<< "${mysql_ver}")" || true

                        if [ "${is_tidb}x" == "1x" ]
                        then #拼接killsql, 避免长时间不返回结果
                            concat_str="concat('kill tidb ',id,';')"
                        else
                            concat_str="concat('kill ',id,';')"
                        fi
                        #kill掉本次查询的sql, 因为标注了32位的md5值, 所以不用担心kill错
                        f_kill_sql_for_mysql "${db_name}" "${table[${i}]}" "${concat_str}"
                    done
                    f_kill_sql_for_mysql "${db_name}" "${table[${i}]}" "${concat_str}"
                } &
                done
                wait

                if [ "${error_mark}x" == "falsex" ]
                then #false就是说检查结果一致, 步长就往上加
                    start_count="$((${end_count}+1))"
                    end_count=$((${max_count_tmp}+${end_count}))
                fi

                md5sum1_his="${md5sum1}"
                md5sum2_his="${md5sum2}"

                md5sum1="$(md5sum ${md5sum_file1}|awk '{print $1}')"
                md5sum2="$(md5sum ${md5sum_file2}|awk '{print $1}')"
                show_region="${start_count},${end_count}"

                if [ "${md5sum2_his}x" == "${md5sum2}x" ]
                then
                    md5sum_ok_count="$((${md5sum_ok_count}+1))"
                    sleep 0.2
                fi

                #根据information_schema读取出来的数据行数不准确, 所以table_row不能小于end_count
                [ "${table_row}" -lt "${end_count}" ] && table_row="$((${end_count}+1))"

                rate_per="$(awk '{printf("%.2f%\n",$2/$1*100)}' <<< "${table_row} ${end_count}")" #进度
                rate_time="$(f_switch_time "$(($(date +%s)-${start_time}))")"                     #进度

                if [ "${md5sum1}x" == "${md5sum2}x" ] && [ -s "${md5sum_file1}" ]
                then
                    skip_count="$(wc -l 2>/dev/null < ${skip_file})" || skip_count=0 #检查错误的表的个数
                    __="${rate_time}] [ ${rate_per}, ($(cat ${table_list_file}|sort -n|uniq|wc -l):${skip_count})/${#table[@]}"
                    f_logging "$(eval echo ${log_addr}):INFO" "${thread_info} ${show_region} ] [ ${__} ] [ 数据一致"
                else
                    count1="${host_list[0]}:${mysql_port1}:$(cat ${md5sum_file1}|awk -F: '{print $3}')"
                    count2="${host_list[1]}:${mysql_port2}:$(cat ${md5sum_file2}|awk -F: '{print $3}')"
                    __="${thread_info} ${show_region} ] [ 数据不一致 第 $((${error_count}+1)) 次 ] [ ${count1},${count2}"
                    f_logging "$(eval echo ${log_addr}):WARN" "${thread_info} ${show_region} ] [ 数据不一致 第 $((${error_count}+1)) 次 ] [ ${count1},${count2}"
                    [ "${error_mark}x" == "falsex" ] && echo "${_sql}" >> ${error_log_table_file}
                    error_mark="true"
                fi
                [ -f "${md5sum_file1}" ] && rm -f ${md5sum_file1}
                [ -f "${md5sum_file2}" ] && rm -f ${md5sum_file2}
            done

            f_logging "$(eval echo ${log_addr}):INFO" "${thread_info} 检查完毕"

            [ ! -f "${stop_file}" ] && echo "${db_name}.${table[${i}]}" >> ${table_list_file}
            echo 1>&3   #执行完就往3里面写一行，这样就实现并行了
        } &
    done
    wait
    [ -p "${pipe_file}" ] && rm -f "${pipe_file}"
    rmdir ${error_log_table_dir} >/dev/null 2>&1 || true #删除空目录
}
